package com.hy.sync.binlog.kafka;
import java.util.Date;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * kafka
 */
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * create topic
     * @param host
     * @param topic
     * @param partNum
     * @param repeatNum
     */
    public void createTopic(String host,String topic,int partNum,short repeatNum){

        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,host);

        NewTopic newTopic = new NewTopic(topic, partNum, repeatNum);
        AdminClient adminClient = AdminClient.create(properties);

        List<NewTopic> newTopics = Arrays.asList(newTopic);
        adminClient.createTopics(newTopics);
        adminClient.close(10, TimeUnit.SECONDS);
    }

    /**
     * send msg
     * @param topic
     * @param msg
     */
    public void sendMessage(String topic,String msg){
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(msg);
        message.setSendTime(new Date());
        kafkaTemplate.send(topic,msg);
    }



}